3760bf9cc3f9ad948486e706d21e724656a34e15,enterprise/main/java/org/neo4j/kernel/ha/MasterClient.java,MasterClient,sendRequest,#RequestType#SlaveContext#Serializer#Deserializer#,127

Before Change


            @SuppressWarnings( "unchecked" )
            BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
                    channel.getPipeline().get( "blockingHandler" );
            Pair<ChannelBuffer, Boolean> messageContext = readNextMessage( channelContext, reader );
            ChannelBuffer message = messageContext.first();
            T response = deserializer.read( message );
            String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( message ) : null;
            if ( messageContext.other() )
            {
                // This message consists of multiple chunks, apply transactions as early as possible
                message = createDynamicBufferFrom( message );
                boolean more = true;
                while ( more )
                {
                    Pair<ChannelBuffer, Boolean> followingMessage = readNextMessage( channelContext, reader );
                    more = followingMessage.other();
                    message.writeBytes( followingMessage.first() );
                    message = applyFullyAvailableTransactions( datasources, message );
                }
            }
            
            // Here's the remaining transactions if the message consisted of multiple chunks,
            // or all transactions if it only consisted of one chunk.
            TransactionStream txStreams = type.includesSlaveContext() ?
                    readTransactionStreams( datasources, message ) : TransactionStream.EMPTY;
            return new Response<T>( response, txStreams );
        }
        catch ( ClosedChannelException e )

After Change



            // Read the response
            @SuppressWarnings( "unchecked" )
            BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
                    channel.getPipeline().get( "blockingHandler" );
            final Triplet<Channel, ChannelBuffer, ByteBuffer> finalChannelContext = channelContext;
            DechunkingChannelBuffer dechunkingBuffer = new DechunkingChannelBuffer( ChannelBuffers.dynamicBuffer(), reader )
            {
                @Override
                protected ChannelBuffer readNext()
                {
                    ChannelBuffer result = super.readNext();
                    if ( result == null )
                    {
                        channelPool.dispose( finalChannelContext );
                        throw new HaCommunicationException( "Channel has been closed" );
                    }
                    return result;
                }
            };
            T response = deserializer.read( dechunkingBuffer );
            String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( dechunkingBuffer ) : null;
            while ( dechunkingBuffer.expectsMoreChunks() )
            {
                applyFullyAvailableTransactions( datasources, dechunkingBuffer );
                if ( dechunkingBuffer.expectsMoreChunks() )
                {
                    dechunkingBuffer.forceReadNextChunk();
                }
            }
            
            // Here's the remaining transactions if the message consisted of multiple chunks,
            // or all transactions if it only consisted of one chunk.
            TransactionStream txStreams = type.includesSlaveContext() ?
                    readTransactionStreams( datasources, dechunkingBuffer ) : TransactionStream.EMPTY;
            return new Response<T>( response, txStreams );
        }
        catch ( ClosedChannelException e )